package com.tdy.cdc.app.dwd;

import com.tdy.cdc.app.BaseSqlApp;
import com.tdy.cdc.common.Constant;
import com.tdy.cdc.util.FlinkSinkUtil;
import com.tdy.cdc.util.FlinkSourceUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/04/03/10:39
 * @Description:
 */
public class App_13_16 extends BaseSqlApp {
    public static void main(String[] args) {
        new App_13_16().init(
                4004,
                "App_13_16",
                2
        );
    }
    @Override
    protected void invoke(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) {
        // 1 创建源表
        createSourceTables(tableEnv);
        // 2 join
        joinTables(tableEnv);
        // 3 写入Kafka
        writeToKafka(tableEnv);
    }

    private void writeToKafka(StreamTableEnvironment tableEnv) {
        // create kafka mappings
        tableEnv.executeSql("CREATE TABLE target13 ("+
                "    RECORD_ID                             STRING     "+
                "    ,HOSPITAL_NAME                         STRING     "+
                "    ,HOSPITAL_CODE                         STRING     "+
                "    ,AGE_UNIT                              STRING     "+
                "    ,AGE                                   STRING     "+
                "    ,SEX_NAME                              STRING     "+
                "    ,SEX_CODE                              STRING     "+
                "    ,ID_CARD_NUMBER                        STRING     "+
                "    ,PATIENT_VISIT_CATEGORY_CODE           STRING     "+
                "    ,PATIENT_VISIT_CATEGORY                STRING     "+
                "    ,INPATIENT_VISIT_FLOW_ID               STRING     "+
                "    ,INPATIENT_NO                          STRING     "+
                "    ,PATIENT_ID                            STRING     "+
                "    ,PATIENT_NAME                          STRING     "+
                "    ,DOMAIN_ID                             STRING     "+
                "    ,VISIT_DATE                            STRING     "+
                "    ,VISIT_TIMES                           STRING     "+
                "    ,BIRTH_DATE                            STRING     "+
                "    ,AUTHOR_CODE                           STRING     "+
                "    ,AUTHOR_NAME                           STRING     "+
                "    ,CUSTODIAN_NAME                        STRING     "+
                "    ,CUSTODIAN_CODE                        STRING     "+
                "    ,TURN_DATE                             STRING     "+
                "    ,TURN_NAME                             STRING     "+
                "    ,TURN_CODE                             STRING     "+
                "    ,SUCCESSION_DATE                       STRING     "+
                "    ,SUCCESSOR_CODE                        STRING     "+
                "    ,SUCCESSOR_SIGN                        STRING     "+
                "    ,ADMIT_DATETIME                        STRING     "+
                "    ,BED_NO                                STRING     "+
                "    ,BED_NAME                              STRING     "+
                "    ,ROOM_NO                               STRING     "+
                "    ,ROOM_NAME                             STRING     "+
                "    ,WARD_ID                               STRING     "+
                "    ,WARD_NAME                             STRING     "+
                "    ,DEPT_ID                               STRING     "+
                "    ,DEPT_NAME                             STRING     "+
                "    ,CHIEF_COMPLAINT                       STRING     "+
                "    ,ADMIT_CONDITION                       STRING     "+
                "    ,CURRENT_SITUATION                     STRING     "+
                "    ,CHINA_MED_OBSERVE_RESULT              STRING     "+
                "    ,SUCCESSION_TREATMENT_PLAN             STRING     "+
                "    ,THERAPEUTIC_PRINCIPLE                 STRING     "+
                "    ,NEEDING_ATTENTION                     STRING     "+
                "    ,TREAT_PROCESS_DESC                    STRING     "+
                "    ,ADMIT_DIAG_ICD_CODE                  STRING     "+
                "    ,ADMIT_DIAG_ICD_NAME                  STRING     "+
                "    ,ADMIT_CHINA_DISEASE_CODE             STRING     "+
                "    ,ADMIT_CHINA_DISEASE_NAME             STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_NAME            STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_CODE            STRING     "+
                "    ,PRESENT_DIAG_ICD_CODE                STRING     "+
                "    ,PRESENT_DIAG_ICD_NAME                STRING     "+
                "    ,PRESENT_CHINA_DISEASE_CODE           STRING     "+
                "    ,PRESENT_CHINA_DISEASE_NAME           STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_CODE          STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_NAME          STRING     " +
                "    ,XDS_TYPE                             STRING    " +
                "    ,XDS_NAME                             STRING    " +
                "    ,XDS_VERSION                          STRING    " +
                "    ,VISIT_ID                             STRING     "+
                "    ,table_name                           STRING     " +
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED            "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_RELIEF_SHIFT_RECORD));

        tableEnv.executeSql("CREATE TABLE target14 ( "+
                "    RECORD_ID                          STRING     "+
                "    ,HOSPITAL_NAME                     STRING     "+
                "    ,HOSPITAL_CODE                     STRING     "+
                "    ,AGE_UNIT                          STRING     "+
                "    ,AGE                               STRING     "+
                "    ,SEX_NAME                          STRING     "+
                "    ,SEX_CODE                          STRING     "+
                "    ,ID_CARD_NUMBER                    STRING     "+
                "    ,PATIENT_VISIT_CATEGORY_CODE       STRING     "+
                "    ,PATIENT_VISIT_CATEGORY            STRING     "+
                "    ,INPATIENT_VISIT_FLOW_ID           STRING     "+
                "    ,INPATIENT_NO                      STRING     "+
                "    ,PATIENT_ID                        STRING     "+
                "    ,PATIENT_NAME                      STRING     "+
                "    ,DOMAIN_ID                         STRING     "+
                "    ,VISIT_DATE                        STRING     "+
                "    ,VISIT_TIMES                       STRING     "+
                "    ,AUTHOR_CODE                       STRING     "+
                "    ,AUTHOR_NAME                       STRING     "+
                "    ,CUSTODIAN_CODE                    STRING     "+
                "    ,CUSTODIAN_NAME                    STRING     "+
                "    ,TRANSFER_OUT_DATE                 STRING     "+
                "    ,TRANSFER_OUT_CODE                 STRING     "+
                "    ,TRANSFER_OUT_NAME                 STRING     "+
                "    ,TURN_DATE                         STRING     "+
                "    ,TRANSFER_IN_CODE                  STRING     "+
                "    ,TRANSFER_IN_NAME                  STRING     "+
                "    ,ADMIT_DATETIME                    STRING     "+
                "    ,BED_NO                            STRING     "+
                "    ,BED_NAME                          STRING     "+
                "    ,ROOM_NO                           STRING     "+
                "    ,ROOM_NAME                         STRING     "+
                "    ,WARD_ID                           STRING     "+
                "    ,WARD_NAME                         STRING     "+
                "    ,DEPT_ID                           STRING     "+
                "    ,DEPT_NAME                         STRING     "+
                "    ,CHIEF_COMPLAINT                   STRING     "+
                "    ,ADMIT_CONDITION                   STRING     "+
                "    ,CURRENT_SITUATION                 STRING     "+
                "    ,CHINA_MED_OBSERVE_RESULT          STRING     "+
                "    ,TRANSFER_INTREATMENT_PLAN         STRING     "+
                "    ,THERAPEUTIC_PRINCIPLE             STRING     "+
                "    ,NEEDING_ATTENTION                 STRING     "+
                "    ,TRANSFER_RECORD_TYPECODE          STRING     "+
                "    ,TRANSFER_RECORD_TYPE              STRING     "+
                "    ,TRANSFER_OUT_DEPT                 STRING     "+
                "    ,INDEPT_NAME                       STRING     "+
                "    ,TRANSFER_PURPOSES                 STRING     "+
                "    ,ORDER_CONTENT                     STRING     "+
                "    ,CHINA_DECOCTING_METHOD            STRING     "+
                "    ,CHINA_USED_METHOD                 STRING     "+
                "    ,TREAT_PROCESS_DESC                STRING     "+
                "    ,ADMIT_DIAG_ICD_CODE               STRING     "+
                "    ,ADMIT_DIAG_ICD_NAME               STRING     "+
                "    ,ADMIT_CHINA_DISEASE_CODE          STRING     "+
                "    ,ADMIT_CHINA_DISEASE_NAME          STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_NAME         STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_CODE         STRING     "+
                "    ,PRESENT_DIAG_ICD_CODE             STRING     "+
                "    ,PRESENT_DIAG_ICD_NAME             STRING     "+
                "    ,PRESENT_CHINA_DISEASE_CODE        STRING     "+
                "    ,PRESENT_CHINA_DISEASE_NAME        STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_CODE       STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_NAME       STRING     " +
                "    ,XDS_TYPE                          STRING    " +
                "    ,XDS_NAME                          STRING    " +
                "    ,XDS_VERSION                       STRING    " +
                "    ,VISIT_ID                          STRING     "+
                "    ,table_name                        STRING     " +
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED         "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_TRANSFERRED_RECORD));

        tableEnv.executeSql("CREATE TABLE target15 ( "+
                "    RECORD_ID                              STRING     "+
                "    ,HOSPITAL_NAME                         STRING     "+
                "    ,HOSPITAL_CODE                         STRING     "+
                "    ,AGE_UNIT                              STRING     "+
                "    ,AGE                                   STRING     "+
                "    ,SEX_NAME                              STRING     "+
                "    ,SEX_CODE                              STRING     "+
                "    ,ID_CARD_NUMBER                        STRING     "+
                "    ,PATIENT_VISIT_CATEGORY_CODE           STRING     "+
                "    ,PATIENT_VISIT_CATEGORY                STRING     "+
                "    ,INPATIENT_VISIT_FLOW_ID               STRING     "+
                "    ,INPATIENT_NO                          STRING     "+
                "    ,PATIENT_ID                            STRING     "+
                "    ,PATIENT_NAME                          STRING     "+
                "    ,DOMAIN_ID                             STRING     "+
                "    ,VISIT_DATE                            STRING     "+
                "    ,VISIT_TIMES                           STRING     "+
                "    ,BIRTH_DATE                            STRING     "+
                "    ,AUTHOR_CODE                           STRING     "+
                "    ,AUTHOR_NAME                           STRING     "+
                "    ,CUSTODIAN_CODE                        STRING     "+
                "    ,CUSTODIAN_NAME                        STRING     "+
                "    ,ELEC_SIGN_DATETIME                    STRING     "+
                "    ,DOCTOR_ID                             STRING     "+
                "    ,DOCTOR_SIGN_NAME                      STRING     "+
                "    ,BRIEF_SUMMARY_DATETIME                STRING     "+
                "    ,ADMIT_DATETIME                        STRING     "+
                "    ,BED_NO                                STRING     "+
                "    ,BED_NAME                              STRING     "+
                "    ,ROOM_NO                               STRING     "+
                "    ,ROOM_NAME                             STRING     "+
                "    ,WARD_ID                               STRING     "+
                "    ,WARD_NAME                             STRING     "+
                "    ,DEPT_ID                               STRING     "+
                "    ,DEPT_NAME                             STRING     "+
                "    ,CHIEF_COMPLAINT                       STRING     "+
                "    ,ADMIT_CONDITION                       STRING     "+
                "    ,PRESENT_CONDITION                     STRING     "+
                "    ,CHINA_MED_OBSERVE_RESULT              STRING     "+
                "    ,ADMIT_DIAG_ICD_CODE                   STRING     "+
                "    ,ADMIT_DIAG_ICD_NAME                   STRING     "+
                "    ,ADMIT_CHINA_DISEASE_CODE              STRING     "+
                "    ,ADMIT_CHINA_DISEASE_NAME              STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_CODE             STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_NAME             STRING     "+
                "    ,NEXT_TREAT_SCHEME                     STRING     "+
                "    ,THERAPEUTIC_PRINCIPLE                 STRING     "+
                "    ,ORDERS_CONTENT                        STRING     "+
                "    ,CHINA_DECOCTING_METHOD                STRING     "+
                "    ,CHINA_USED_METHOD                     STRING     "+
                "    ,PRESENT_DIAG_ICD_CODE                 STRING     "+
                "    ,PRESENT_DIAG_ICD_NAME                 STRING     "+
                "    ,PRESENT_CHINA_DISEASE_CODE            STRING     "+
                "    ,PRESENT_CHINA_DISEASE_NAME            STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_CODE           STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_NAME           STRING     " +
                "    ,XDS_TYPE                              STRING    " +
                "    ,XDS_NAME                              STRING    " +
                "    ,XDS_VERSION                           STRING    " +
                "    ,VISIT_ID                              STRING     "+
                "    ,table_name                            STRING     " +
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED             "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_STAGE_SUMMARY));

        tableEnv.executeSql("CREATE TABLE target16 ("+
                "    RECORD_ID                               STRING    "+
                "    ,HOSPITAL_NAME                          STRING    "+
                "    ,HOSPITAL_CODE                          STRING    "+
                "    ,AGE_UNIT                               STRING    "+
                "    ,AGE                                    STRING    "+
                "    ,SEX_NAME                               STRING    "+
                "    ,SEX_CODE                               STRING    "+
                "    ,ID_CARD_NUMBER                         STRING    "+
                "    ,PATIENT_VISIT_CATEGORY_CODE            STRING    "+
                "    ,PATIENT_VISIT_CATEGORY                 STRING    "+
                "    ,INPATIENT_VISIT_FLOW_ID                STRING    "+
                "    ,INPATIENT_NO                           STRING    "+
                "    ,PATIENT_ID                             STRING    "+
                "    ,PATIENT_NAME                           STRING    "+
                "    ,DOMAIN_ID                              STRING    "+
                "    ,VISIT_DATE                             STRING    "+
                "    ,VISIT_TIMES                            STRING    "+
                "    ,SUMMARY_DATETIME                       STRING    "+
                "    ,AUTHOR_CODE                            STRING    "+
                "    ,AUTHOR_NAME                            STRING    "+
                "    ,CUSTODIAN_CODE                         STRING    "+
                "    ,CUSTODIAN_NAME                         STRING    "+
                "    ,OPER_DOCTOR_SIGN_DATETIME              STRING    "+
                "    ,OPER_DOCTOR_ID                         STRING    "+
                "    ,OPER_DOCTOR_SIGN_NAME                  STRING    "+
                "    ,ELEC_SIGN_DATETIME                     STRING    "+
                "    ,DOCTOR_ID                              STRING    "+
                "    ,DOCTOR_SIGN_NAME                       STRING    "+
                "    ,LINKMAN_PHONE                          STRING    "+
                "    ,LINKMAN_NAME                           STRING    "+
                "    ,ADMIT_DATETIME                         STRING    "+
                "    ,DISCHARGE_DATETIME                     STRING    "+
                "    ,BED_NO                                 STRING    "+
                "    ,BED_NAME                               STRING    "+
                "    ,ROOM_NO                                STRING    "+
                "    ,ROOM_NAME                              STRING    "+
                "    ,WARD_ID                                STRING    "+
                "    ,WARD_NAME                              STRING    "+
                "    ,DEPT_ID                                STRING    "+
                "    ,DEPT_NAME                              STRING    "+
                "    ,MEDICAL_SUMMARY                        STRING    "+
                "    ,DISEASE_DIAGNOSIS_CODE                 STRING    "+
                "    ,PRE_OPER_DIAG_ICD_NAME                 STRING    "+
                "    ,DIAG_BASIS                             STRING    "+
                "    ,ALLERGIC_HISTORY_FLAG                  STRING    "+
                "    ,ALLERGIC_HISTORY                       STRING    "+
                "    ,ASSIST_EXAM_RESULT                     STRING    "+
                "    ,OPER_INDICATION                        STRING    "+
                "    ,OPER_CONTRAINDICATION                  STRING    "+
                "    ,SURGICAL_INDICATION                    STRING    "+
                "    ,CONSULTATION_OPINION                   STRING    "+
                "    ,INTEND_OPER_CM3_CODE                   STRING    "+
                "    ,INTEND_OPER_CM3_NAME                   STRING    "+
                "    ,INTEND_OPER_SITE_NAME                  STRING    "+
                "    ,INTEND_OPER_DATETIME                   STRING    "+
                "    ,INTEND_ANES_METHOD_CODE                STRING    "+
                "    ,INTEND_ANES_METHOD_NAME                STRING    "+
                "    ,CONSIDERATIONS                         STRING    "+
                "    ,OPER_KEY_POINT                         STRING    "+
                "    ,PRE_OPER_PREPARATION                   STRING    " +
                "    ,XDS_TYPE                               STRING    " +
                "    ,XDS_NAME                               STRING    " +
                "    ,XDS_VERSION                            STRING    " +
                "    ,VISIT_ID                               STRING    "+
                "    ,table_name                           STRING     " +
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED             "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_PREOPERATION_SUMMARY));

        // 导入数据
        StreamStatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsertSql("insert into target13 select * from join_result13")
                .addInsertSql("insert into target14 select * from join_result14")
                .addInsertSql("insert into target15 select * from join_result15")
                .addInsertSql("insert into target16 select * from join_result16");
        statementSet.execute();
    }

    private void joinTables(StreamTableEnvironment tableEnv) {
        // 1 设置数据的TTL（暂时10min）
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(30 * 60));
        // 2 join操作
        Table joinResult13 = tableEnv.sqlQuery("select a.unique_id    AS    RECORD_ID                                "+
                "    ,a.hospital_name                AS      HOSPITAL_NAME                                                            "+
                "    ,a.hospital_code                AS      HOSPITAL_CODE                                                            "+
                "    ,a.age_unit                     AS      AGE_UNIT                                                                 "+
                "    ,a.age                          AS      AGE                                                                      "+
                "    ,a.sex                          AS      SEX_NAME                                                                 "+
                "    ,a.sex_code                     AS      SEX_CODE                                                                 "+
                "    ,a.identify_no                  AS      ID_CARD_NUMBER                                                           "+
                "    ,a.patient_typecode             AS      PATIENT_VISIT_CATEGORY_CODE                                              "+
                "    ,a.patient_type                 AS      PATIENT_VISIT_CATEGORY                                                   "+
                "    ,a.visit_id                     AS      INPATIENT_VISIT_FLOW_ID                                                  "+
                "    ,a.inpatient                    AS      INPATIENT_NO                                                             "+
                "    ,a.patient_id                   AS      PATIENT_ID                                                               "+
                "    ,a.patient_name                 AS      PATIENT_NAME                                                             "+
                "    ,a.domain_id                    AS      DOMAIN_ID                                                                "+
                "    ,a.visit_date                   AS      VISIT_DATE                                                               "+
                "    ,a.visit_times                  AS      VISIT_TIMES                                                              "+
                "    ,a.birth                        AS      BIRTH_DATE                                                               "+
                "    ,a.author_code                  AS      AUTHOR_CODE                                                              "+
                "    ,a.author_name                  AS      AUTHOR_NAME                                                              "+
                "    ,a.custodian_code               AS      CUSTODIAN_NAME                                                           "+
                "    ,a.custodian_name               AS      CUSTODIAN_CODE                                                           "+
                "    ,a.turn_data                    AS      TURN_DATE                                                                "+
                "    ,a.turn_name                    AS      TURN_NAME                                                                "+
                "    ,a.turn_code                    AS      TURN_CODE                                                                "+
                "    ,a.succession_date              AS      SUCCESSION_DATE                                                          "+
                "    ,a.successor_code               AS      SUCCESSOR_CODE                                                           "+
                "    ,a.successor_sign               AS      SUCCESSOR_SIGN                                                           "+
                "    ,a.admission_date               AS      ADMIT_DATETIME                                                           "+
                "    ,a.bed_no                       AS      BED_NO                                                                   "+
                "    ,a.bed_name                     AS      BED_NAME                                                                 "+
                "    ,a.ward_id                      AS      ROOM_NO                                                                  "+
                "    ,a.ward_name                    AS      ROOM_NAME                                                                "+
                "    ,a.wards_id                     AS      WARD_ID                                                                  "+
                "    ,a.wards_name                   AS      WARD_NAME                                                                "+
                "    ,a.dept_code                    AS      DEPT_ID                                                                  "+
                "    ,a.dept_name                    AS      DEPT_NAME                                                                "+
                "    ,a.chief_complaint              AS      CHIEF_COMPLAINT                                                          "+
                "    ,a.admission_status             AS      ADMIT_CONDITION                                                          "+
                "    ,a.current_situation            AS      CURRENT_SITUATION                                                        "+
                "    ,a.tcm_four_diagnosis           AS      CHINA_MED_OBSERVE_RESULT                                                 "+
                "    ,a.succession_treatment_plan    AS      SUCCESSION_TREATMENT_PLAN                                                "+
                "    ,a.principle_and_method         AS      THERAPEUTIC_PRINCIPLE                                                    "+
                "    ,a.needing_attention            AS      NEEDING_ATTENTION                                                        "+
                "    ,a.treatment_process            AS      TREAT_PROCESS_DESC                                                       "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_code else '' end     AS     ADMIT_DIAG_ICD_CODE              "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_name else '' end     AS     ADMIT_DIAG_ICD_NAME              "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_code else '' end     AS     ADMIT_CHINA_DISEASE_CODE         "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_name else '' end     AS     ADMIT_CHINA_DISEASE_NAME         "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_code else '' end     AS     ADMIT_CHINA_SYNDROME_NAME        "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_name else '' end     AS     ADMIT_CHINA_SYNDROME_CODE        "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_code else '' end     AS     PRESENT_DIAG_ICD_CODE            "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_name else '' end     AS     PRESENT_DIAG_ICD_NAME            "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_code else '' end     AS     PRESENT_CHINA_DISEASE_CODE       "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_name else '' end     AS     PRESENT_CHINA_DISEASE_NAME       "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_code else '' end     AS     PRESENT_CHINA_SYNDROME_CODE      "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_name else '' end     AS     PRESENT_CHINA_SYNDROME_NAME      " +
                "    ,a.xds_type as XDS_TYPE  " +
                "    ,a.xds_name as XDS_NAME  " +
                "    ,a.xds_version as XDS_VERSION  " +
                "    ,visit_id AS VISIT_ID    " +
                "    ,'dwd_mhs_ipt_relief_shift_record' as table_name "+
                " from hdsd00_14_09 a                                                                                                 "+
                " left join tdy_list_diag b                                                                                           "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                                          ");
        tableEnv.createTemporaryView("join_result13",joinResult13);

        Table joinResult14 = tableEnv.sqlQuery("select a.unique_id    AS     RECORD_ID                                     "+
                "    ,a.hospital_name                 AS     HOSPITAL_NAME                                                           "+
                "    ,a.hospital_code                 AS     HOSPITAL_CODE                                                           "+
                "    ,a.age_unit                      AS     AGE_UNIT                                                                "+
                "    ,a.age                           AS     AGE                                                                     "+
                "    ,a.sex                           AS     SEX_NAME                                                                "+
                "    ,a.sex_code                      AS     SEX_CODE                                                                "+
                "    ,a.identify_no                   AS     ID_CARD_NUMBER                                                          "+
                "    ,a.patient_typecode              AS     PATIENT_VISIT_CATEGORY_CODE                                             "+
                "    ,a.patient_type                  AS     PATIENT_VISIT_CATEGORY                                                  "+
                "    ,a.visit_id                      AS     INPATIENT_VISIT_FLOW_ID                                                 "+
                "    ,a.inpatient                     AS     INPATIENT_NO                                                            "+
                "    ,a.patient_id                    AS     PATIENT_ID                                                              "+
                "    ,a.patient_name                  AS     PATIENT_NAME                                                            "+
                "    ,a.domain_id                     AS     DOMAIN_ID                                                               "+
                "    ,a.visit_date                    AS     VISIT_DATE                                                              "+
                "    ,a.visit_times                   AS     VISIT_TIMES                                                             "+
                "    ,a.author_code                   AS     AUTHOR_CODE                                                             "+
                "    ,a.author_name                   AS     AUTHOR_NAME                                                             "+
                "    ,a.custodian_code                AS     CUSTODIAN_CODE                                                          "+
                "    ,a.custodian_name                AS     CUSTODIAN_NAME                                                          "+
                "    ,a.transfer_out_date             AS     TRANSFER_OUT_DATE                                                       "+
                "    ,a.transfer_out_code             AS     TRANSFER_OUT_CODE                                                       "+
                "    ,a.transfer_out_name             AS     TRANSFER_OUT_NAME                                                       "+
                "    ,a.turn_date                     AS     TURN_DATE                                                               "+
                "    ,a.transfer_in_code              AS     TRANSFER_IN_CODE                                                        "+
                "    ,a.transfer_in_name              AS     TRANSFER_IN_NAME                                                        "+
                "    ,a.admission_date                AS     ADMIT_DATETIME                                                          "+
                "    ,a.bed_no                        AS     BED_NO                                                                  "+
                "    ,a.bed_name                      AS     BED_NAME                                                                "+
                "    ,a.ward_id                       AS     ROOM_NO                                                                 "+
                "    ,a.ward_name                     AS     ROOM_NAME                                                               "+
                "    ,a.wards_id                      AS     WARD_ID                                                                 "+
                "    ,a.wards_name                    AS     WARD_NAME                                                               "+
                "    ,a.dept_code                     AS     DEPT_ID                                                                 "+
                "    ,a.dept_name                     AS     DEPT_NAME                                                               "+
                "    ,a.chief_complaint               AS     CHIEF_COMPLAINT                                                         "+
                "    ,a.admission_status              AS     ADMIT_CONDITION                                                         "+
                "    ,a.current_situation             AS     CURRENT_SITUATION                                                       "+
                "    ,a.tcm_four_diagnosis            AS     CHINA_MED_OBSERVE_RESULT                                                "+
                "    ,a.transfer_intreatment_plan     AS     TRANSFER_INTREATMENT_PLAN                                               "+
                "    ,a.principle_and_method          AS     THERAPEUTIC_PRINCIPLE                                                   "+
                "    ,a.needing_attention             AS     NEEDING_ATTENTION                                                       "+
                "    ,a.transfer_record_typecode      AS     TRANSFER_RECORD_TYPECODE                                                "+
                "    ,a.transfer_record_type          AS     TRANSFER_RECORD_TYPE                                                    "+
                "    ,a.transfer_out_dept             AS     TRANSFER_OUT_DEPT                                                       "+
                "    ,a.indept_name                   AS     INDEPT_NAME                                                             "+
                "    ,a.transfer_purposes             AS     TRANSFER_PURPOSES                                                       "+
                "    ,a.order_content                 AS     ORDER_CONTENT                                                           "+
                "    ,a.decoct_method                 AS     CHINA_DECOCTING_METHOD                                                  "+
                "    ,a.tcm_drug_use                  AS     CHINA_USED_METHOD                                                       "+
                "    ,a.treatment_process             AS     TREAT_PROCESS_DESC                                                      "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_code else '' end      AS     ADMIT_DIAG_ICD_CODE            "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_name else '' end      AS     ADMIT_DIAG_ICD_NAME            "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_code else '' end      AS     ADMIT_CHINA_DISEASE_CODE       "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_name else '' end      AS     ADMIT_CHINA_DISEASE_NAME       "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_code else '' end      AS     ADMIT_CHINA_SYNDROME_NAME      "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_name else '' end      AS     ADMIT_CHINA_SYNDROME_CODE      "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_code else '' end      AS     PRESENT_DIAG_ICD_CODE          "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_name else '' end      AS     PRESENT_DIAG_ICD_NAME          "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_code else '' end      AS     PRESENT_CHINA_DISEASE_CODE     "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_name else '' end      AS     PRESENT_CHINA_DISEASE_NAME     "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_code else '' end      AS     PRESENT_CHINA_SYNDROME_CODE    "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_name else '' end      AS     PRESENT_CHINA_SYNDROME_NAME    " +
                "    ,a.xds_type as XDS_TYPE  " +
                "    ,a.xds_name as XDS_NAME  " +
                "    ,a.xds_version as XDS_VERSION  " +
                "    ,visit_id AS VISIT_ID" +
                "    ,'dwd_mhs_ipt_transferred_record' as table_name  "+
                " from hdsd00_14_05 a                                                                                                "+
                " left join tdy_list_diag b                                                                                          "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                                         ");
        tableEnv.createTemporaryView("join_result14",joinResult14);

        Table joinResult15 = tableEnv.sqlQuery("select    a.unique_id    AS    RECORD_ID                                 "+
                "    ,a.hospital_name                 AS    HOSPITAL_NAME                                                               "+
                "    ,a.hospital_code                 AS    HOSPITAL_CODE                                                               "+
                "    ,a.age_unit                      AS    AGE_UNIT                                                                    "+
                "    ,a.age                           AS    AGE                                                                         "+
                "    ,a.sex                           AS    SEX_NAME                                                                    "+
                "    ,a.sex_code                      AS    SEX_CODE                                                                    "+
                "    ,a.identify_no                   AS    ID_CARD_NUMBER                                                              "+
                "    ,a.patient_typecode              AS    PATIENT_VISIT_CATEGORY_CODE                                                 "+
                "    ,a.patient_type                  AS    PATIENT_VISIT_CATEGORY                                                      "+
                "    ,a.visit_id                      AS    INPATIENT_VISIT_FLOW_ID                                                     "+
                "    ,a.inpatient                     AS    INPATIENT_NO                                                                "+
                "    ,a.patient_id                    AS    PATIENT_ID                                                                  "+
                "    ,a.patient_name                  AS    PATIENT_NAME                                                                "+
                "    ,a.domain_id                     AS    DOMAIN_ID                                                                   "+
                "    ,a.visit_date                    AS    VISIT_DATE                                                                  "+
                "    ,a.visit_times                   AS    VISIT_TIMES                                                                 "+
                "    ,a.birth                         AS    BIRTH_DATE                                                                  "+
                "    ,a.author_code                   AS    AUTHOR_CODE                                                                 "+
                "    ,a.author_name                   AS    AUTHOR_NAME                                                                 "+
                "    ,a.custodian_code                AS    CUSTODIAN_CODE                                                              "+
                "    ,a.custodian_name                AS    CUSTODIAN_NAME                                                              "+
                "    ,a.doctor_signdate               AS    ELEC_SIGN_DATETIME                                                          "+
                "    ,a.doctor_code                   AS    DOCTOR_ID                                                                   "+
                "    ,a.doctor_name                   AS    DOCTOR_SIGN_NAME                                                            "+
                "    ,a.summary_date                  AS    BRIEF_SUMMARY_DATETIME                                                      "+
                "    ,a.admission_date                AS    ADMIT_DATETIME                                                              "+
                "    ,a.bed_no                        AS    BED_NO                                                                      "+
                "    ,a.bed_name                      AS    BED_NAME                                                                    "+
                "    ,a.ward_id                       AS    ROOM_NO                                                                     "+
                "    ,a.ward_name                     AS    ROOM_NAME                                                                   "+
                "    ,a.wards_id                      AS    WARD_ID                                                                     "+
                "    ,a.wards_name                    AS    WARD_NAME                                                                   "+
                "    ,a.dept_code                     AS    DEPT_ID                                                                     "+
                "    ,a.dept_name                     AS    DEPT_NAME                                                                   "+
                "    ,a.chief_complaint               AS    CHIEF_COMPLAINT                                                             "+
                "    ,a.admission_status              AS    ADMIT_CONDITION                                                             "+
                "    ,a.current_situation             AS    PRESENT_CONDITION                                                           "+
                "    ,a.tcm_four_diagnosis            AS    CHINA_MED_OBSERVE_RESULT                                                    "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_code else '' end        AS       ADMIT_DIAG_ICD_CODE           "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_name else '' end        AS       ADMIT_DIAG_ICD_NAME           "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_code else '' end        AS       ADMIT_CHINA_DISEASE_CODE      "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_name else '' end        AS       ADMIT_CHINA_DISEASE_NAME      "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_code else '' end        AS       ADMIT_CHINA_SYNDROME_CODE     "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_name else '' end        AS       ADMIT_CHINA_SYNDROME_NAME     "+
                "    ,a.treatment_plan                AS    NEXT_TREAT_SCHEME                                                           "+
                "    ,a.principle_and_method          AS    THERAPEUTIC_PRINCIPLE                                                       "+
                "    ,a.order_content                 AS    ORDERS_CONTENT                                                              "+
                "    ,a.decoct_method                 AS    CHINA_DECOCTING_METHOD                                                      "+
                "    ,a.tcm_drug_use                  AS    CHINA_USED_METHOD                                                           "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_code else '' end        AS       PRESENT_DIAG_ICD_CODE         "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_name else '' end        AS       PRESENT_DIAG_ICD_NAME         "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_code else '' end        AS       PRESENT_CHINA_DISEASE_CODE    "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_name else '' end        AS       PRESENT_CHINA_DISEASE_NAME    "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_code else '' end        AS       PRESENT_CHINA_SYNDROME_CODE   "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_name else '' end        AS       PRESENT_CHINA_SYNDROME_NAME   " +
                "    ,a.xds_type as XDS_TYPE  " +
                "    ,a.xds_name as XDS_NAME  " +
                "    ,a.xds_version as XDS_VERSION  " +
                "    ,visit_id AS VISIT_ID" +
                "    ,'dwd_mhs_ipt_stage_summary' as table_name    "+
                " from hdsd00_14_01 a                                                                                                   "+
                " left join tdy_list_diag b                                                                                             "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                                            ");
        tableEnv.createTemporaryView("join_result15",joinResult15);

        Table joinResult16 = tableEnv.sqlQuery("select a.unique_id    AS    RECORD_ID                     "+
                "    ,a.hospital_name            AS       HOSPITAL_NAME                                              "+
                "    ,a.hospital_code            AS       HOSPITAL_CODE                                              "+
                "    ,a.age_unit                 AS       AGE_UNIT                                                   "+
                "    ,a.age                      AS       AGE                                                        "+
                "    ,a.sex                      AS       SEX_NAME                                                   "+
                "    ,a.sex_code                 AS       SEX_CODE                                                   "+
                "    ,a.identify_no              AS       ID_CARD_NUMBER                                             "+
                "    ,a.patient_typecode         AS       PATIENT_VISIT_CATEGORY_CODE                                "+
                "    ,a.patient_type             AS       PATIENT_VISIT_CATEGORY                                     "+
                "    ,a.visit_id                 AS       INPATIENT_VISIT_FLOW_ID                                    "+
                "    ,a.inpatient                AS       INPATIENT_NO                                               "+
                "    ,a.patient_id               AS       PATIENT_ID                                                 "+
                "    ,a.patient_name             AS       PATIENT_NAME                                               "+
                "    ,a.domain_id                AS       DOMAIN_ID                                                  "+
                "    ,a.visit_date               AS       VISIT_DATE                                                 "+
                "    ,a.visit_times              AS       VISIT_TIMES                                                "+
                "    ,a.effective_time           AS       SUMMARY_DATETIME                                           "+
                "    ,a.author_code              AS       AUTHOR_CODE                                                "+
                "    ,a.author_name              AS       AUTHOR_NAME                                                "+
                "    ,a.custodian_code           AS       CUSTODIAN_CODE                                             "+
                "    ,a.custodian_name           AS       CUSTODIAN_NAME                                             "+
                "    ,a.surgeon_signdate         AS       OPER_DOCTOR_SIGN_DATETIME                                  "+
                "    ,a.surgeon_code             AS       OPER_DOCTOR_ID                                             "+
                "    ,a.surgeon_name             AS       OPER_DOCTOR_SIGN_NAME                                      "+
                "    ,a.doctor_signdate          AS       ELEC_SIGN_DATETIME                                         "+
                "    ,a.doctor_code              AS       DOCTOR_ID                                                  "+
                "    ,a.doctor_name              AS       DOCTOR_SIGN_NAME                                           "+
                "    ,a.contact_number           AS       LINKMAN_PHONE                                              "+
                "    ,a.contact_name             AS       LINKMAN_NAME                                               "+
                "    ,a.admission_date           AS       ADMIT_DATETIME                                             "+
                "    ,a.out_date                 AS       DISCHARGE_DATETIME                                         "+
                "    ,a.bed_no                   AS       BED_NO                                                     "+
                "    ,a.bed_name                 AS       BED_NAME                                                   "+
                "    ,a.ward_id                  AS       ROOM_NO                                                    "+
                "    ,a.ward_name                AS       ROOM_NAME                                                  "+
                "    ,a.wards_id                 AS       WARD_ID                                                    "+
                "    ,a.wards_name               AS       WARD_NAME                                                  "+
                "    ,a.dept_code                AS       DEPT_ID                                                    "+
                "    ,a.dept_name                AS       DEPT_NAME                                                  "+
                "    ,a.record_summary           AS       MEDICAL_SUMMARY                                            "+
                "    ,case when b.diag_type='术前诊断' then b.diag_code else '' end  AS  DISEASE_DIAGNOSIS_CODE      "+
                "    ,case when b.diag_type='术前诊断' then b.diag_name else '' end  AS  PRE_OPER_DIAG_ICD_NAME      "+
                "    ,a.diagnostic_basis         AS       DIAG_BASIS                                                 "+
                "    ,a.allergy_history_mark     AS       ALLERGIC_HISTORY_FLAG                                      "+
                "    ,a.allergy_history          AS       ALLERGIC_HISTORY                                           "+
                "    ,a.item_result              AS       ASSIST_EXAM_RESULT                                         "+
                "    ,a.oper_indication          AS       OPER_INDICATION                                            "+
                "    ,a.contraindication         AS       OPER_CONTRAINDICATION                                      "+
                "    ,a.oper_surgical            AS       SURGICAL_INDICATION                                        "+
                "    ,a.consultation_opinion     AS       CONSULTATION_OPINION                                       "+
                "    ,a.oper_code                AS       INTEND_OPER_CM3_CODE                                       "+
                "    ,a.oper_name                AS       INTEND_OPER_CM3_NAME                                       "+
                "    ,a.target_name              AS       INTEND_OPER_SITE_NAME                                      "+
                "    ,a.oper_date                AS       INTEND_OPER_DATETIME                                       "+
                "    ,a.anes_method_code         AS       INTEND_ANES_METHOD_CODE                                    "+
                "    ,a.anes_name                AS       INTEND_ANES_METHOD_NAME                                    "+
                "    ,a.needing_attention        AS       CONSIDERATIONS                                             "+
                "    ,a.oper_key_point           AS       OPER_KEY_POINT                                             "+
                "    ,a.pre_preparation          AS       PRE_OPER_PREPARATION                                       " +
                "    ,a.xds_type as XDS_TYPE  " +
                "    ,a.xds_name as XDS_NAME  " +
                "    ,a.xds_version as XDS_VERSION  " +
                "    ,visit_id AS VISIT_ID    " +
                "    ,'dwd_mhs_ipt_preoperation_summary' as table_name   "+
                " from hdsd00_14_04 a                                                                                "+
                " left join tdy_list_diag b                                                                          "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                         ");
        tableEnv.createTemporaryView("join_result16",joinResult16);
    }

    private void createSourceTables(StreamTableEnvironment tableEnv) {
        // create table : hdsd00_14_09
        tableEnv.executeSql("create table hdsd00_14_09 (       "+
                "pk                              string," +
                "upload_time                     string," +
                "status                          string," +
                "empi                            string," +
                "encounter_id                    string," +
                "visit_date                      string," +
                "patient_id                      string," +
                "patient_domain                  string," +
                "visit_domain                    string," +
                "visit_id                        string," +
                "visit_times                     string," +
                "unique_id                       string," +
                "xds_type                        string," +
                "xds_name                        string," +
                "effective_time                  string," +
                "xds_version                     string," +
                "domain_id                       string," +
                "patient_type                    string," +
                "patient_typecode                string," +
                "inpatient                       string," +
                "identify_no                     string," +
                "patient_name                    string," +
                "sex_code                        string," +
                "sex                             string," +
                "birth                           string," +
                "age                             string," +
                "age_unit                        string," +
                "write_time                      string," +
                "author_code                     string," +
                "author_name                     string," +
                "custodian_name                  string," +
                "custodian_code                  string," +
                "turn_data                       string," +
                "turn_name                       string," +
                "turn_code                       string," +
                "succession_date                 string," +
                "successor_code                  string," +
                "successor_sign                  string," +
                "admission_date                  string," +
                "bed_no                          string," +
                "bed_name                        string," +
                "ward_id                         string," +
                "ward_name                       string," +
                "wards_id                        string," +
                "wards_name                      string," +
                "dept_code                       string," +
                "dept_name                       string," +
                "hospital_code                   string," +
                "hospital_name                   string," +
                "chief_complaint                 string," +
                "admission_status                string," +
                "current_situation               string," +
                "tcm_four_diagnosis              string," +
                "succession_treatment_plan       string," +
                "principle_and_method            string," +
                "needing_attention               string," +
                "treatment_process               string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_09","App_13_16"));
        // create table : hdsd00_14_05
        tableEnv.executeSql("create table hdsd00_14_05 (         "+
                "pk                           string," +
                "upload_time                  string," +
                "status                       string," +
                "empi                         string," +
                "encounter_id                 string," +
                "visit_date                   string," +
                "visit_domain                 string," +
                "visit_id                     string," +
                "visit_times                  string," +
                "patient_id                   string," +
                "patient_domain               string," +
                "unique_id                    string," +
                "xds_type                     string," +
                "xds_name                     string," +
                "effective_time               string," +
                "xds_version                  string," +
                "domain_id                    string," +
                "patient_type                 string," +
                "patient_typecode             string," +
                "inpatient                    string," +
                "identify_no                  string," +
                "patient_name                 string," +
                "sex_code                     string," +
                "sex                          string," +
                "age                          string," +
                "age_unit                     string," +
                "write_time                   string," +
                "author_code                  string," +
                "author_name                  string," +
                "custodian_code               string," +
                "custodian_name               string," +
                "transfer_out_date            string," +
                "transfer_out_code            string," +
                "transfer_out_name            string," +
                "turn_date                    string," +
                "transfer_in_code             string," +
                "transfer_in_name             string," +
                "admission_date               string," +
                "bed_no                       string," +
                "bed_name                     string," +
                "ward_id                      string," +
                "ward_name                    string," +
                "wards_id                     string," +
                "wards_name                   string," +
                "dept_code                    string," +
                "dept_name                    string," +
                "hospital_code                string," +
                "hospital_name                string," +
                "chief_complaint              string," +
                "admission_status             string," +
                "current_situation            string," +
                "tcm_four_diagnosis           string," +
                "transfer_intreatment_plan    string," +
                "principle_and_method         string," +
                "needing_attention            string," +
                "transfer_record_typecode     string," +
                "transfer_record_type         string," +
                "transfer_out_dept            string," +
                "indept_name                  string," +
                "transfer_purposes            string," +
                "order_content                string," +
                "decoct_method                string," +
                "tcm_drug_use                 string," +
                "treatment_process            string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_05","App_13_16"));
        // create table : hdsd00_14_01
        tableEnv.executeSql("create table hdsd00_14_01 (       " +
                "pk                       string," +
                "upload_time              string," +
                "status                   string," +
                "empi                     string," +
                "encounter_id             string," +
                "visit_date               string," +
                "visit_domain             string," +
                "visit_id                 string," +
                "visit_times              string," +
                "patient_id               string," +
                "patient_domain           string," +
                "unique_id                string," +
                "xds_type                 string," +
                "xds_name                 string," +
                "effective_time           string," +
                "xds_version              string," +
                "domain_id                string," +
                "patient_type             string," +
                "patient_typecode         string," +
                "inpatient                string," +
                "identify_no              string," +
                "patient_name             string," +
                "sex_code                 string," +
                "sex                      string," +
                "birth                    string," +
                "age                      string," +
                "age_unit                 string," +
                "write_time               string," +
                "author_code              string," +
                "author_name              string," +
                "custodian_code           string," +
                "custodian_name           string," +
                "doctor_signdate          string," +
                "doctor_code              string," +
                "doctor_name              string," +
                "summary_date             string," +
                "admission_date           string," +
                "bed_no                   string," +
                "bed_name                 string," +
                "ward_id                  string," +
                "ward_name                string," +
                "wards_id                 string," +
                "wards_name               string," +
                "dept_code                string," +
                "dept_name                string," +
                "hospital_code            string," +
                "hospital_name            string," +
                "chief_complaint          string," +
                "admission_status         string," +
                "current_situation        string," +
                "tcm_four_diagnosis       string," +
                "treatment_plan           string," +
                "principle_and_method     string," +
                "order_content            string," +
                "decoct_method            string," +
                "tcm_drug_use             string," +
                "treatment_process        string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_01", "App_13_16"));
        // create table : hdsd00_14_04
        tableEnv.executeSql("create table hdsd00_14_04 ( "+
                "pk                     string," +
                "upload_time            string," +
                "status                 string," +
                "empi                   string," +
                "encounter_id           string," +
                "visit_date             string," +
                "patient_id             string," +
                "patient_domain         string," +
                "visit_domain           string," +
                "visit_id               string," +
                "visit_times            string," +
                "unique_id              string," +
                "xds_type               string," +
                "xds_name               string," +
                "effective_time         string," +
                "xds_version            string," +
                "domain_id              string," +
                "patient_type           string," +
                "patient_typecode       string," +
                "inpatient              string," +
                "identify_no            string," +
                "patient_name           string," +
                "sex_code               string," +
                "sex                    string," +
                "age                    string," +
                "age_unit               string," +
                "write_time             string," +
                "author_code            string," +
                "author_name            string," +
                "custodian_code         string," +
                "custodian_name         string," +
                "surgeon_signdate       string," +
                "surgeon_code           string," +
                "surgeon_name           string," +
                "doctor_signdate        string," +
                "doctor_code            string," +
                "doctor_name            string," +
                "contact_number         string," +
                "contact_name           string," +
                "admission_date         string," +
                "out_date               string," +
                "bed_no                 string," +
                "bed_name               string," +
                "ward_id                string," +
                "ward_name              string," +
                "wards_id               string," +
                "wards_name             string," +
                "dept_code              string," +
                "dept_name              string," +
                "hospital_code          string," +
                "hospital_name          string," +
                "record_summary         string," +
                "diagnostic_basis       string," +
                "allergy_history_mark   string," +
                "allergy_history        string," +
                "item_result            string," +
                "oper_indication        string," +
                "contraindication       string," +
                "oper_surgical          string," +
                "consultation_opinion   string," +
                "oper_code              string," +
                "oper_name              string," +
                "target_name            string," +
                "oper_date              string," +
                "anes_method_code       string," +
                "anes_name              string," +
                "needing_attention      string," +
                "oper_key_point         string," +
                "pre_preparation        string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_04","App_13_16"));

        // create table : tdy_list_diag
        super.readTdyListDiag(tableEnv,"App_13_16");
    }
}
